Spark 知道 DataFrame 的分区键吗?

Does Spark know the partitioning key of a DataFrame?

我想知道 Spark 是否知道 parquet 文件的分区键并使用此信息来避免随机播放。

上下文:

运行 Spark 2.0.1 运行 本地 SparkSession。我有一个 csv 数据集,我将其作为 parquet 文件保存在我的磁盘上,如下所示:

val df0 = spark
  .read
  .format("csv")
  .option("header", true)
  .option("delimiter", ";")
  .option("inferSchema", false)
  .load("SomeFile.csv"))


val df = df0.repartition(partitionExprs = col("numerocarte"), numPartitions = 42)

df.write
  .mode(SaveMode.Overwrite)
  .format("parquet")
  .option("inferSchema", false)
  .save("SomeFile.parquet")

我正在按 numerocarte 列创建 42 个分区。这应该将多个 numerocarte 分组到同一个分区。我不想在 write 时执行 partitionBy("numerocarte") 因为我不希望每张卡有一个分区。将有数百万。

之后,我在另一个脚本中读取了这个 SomeFile.parquet 实木复合地板文件并对其进行了一些操作。特别是我 运行 一个 window function 在它上面,分区是在镶木地板文件重新分区的同一列上完成的。

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val df2 = spark.read
  .format("parquet")
  .option("header", true)
  .option("inferSchema", false)
  .load("SomeFile.parquet")

val w = Window.partitionBy(col("numerocarte"))
.orderBy(col("SomeColumn"))

df2.withColumn("NewColumnName",
      sum(col("dollars").over(w))

read 之后,我可以看到 repartition 按预期工作,DataFrame df2 有 42 个分区,每个分区都有不同的卡片。

问题:

  1. Spark 知道数据帧 df2 是按列 numerocarte 分区的吗?
  2. 如果它知道,那么window函数中就不会有洗牌。真的吗?
  3. 如果它不知道,它会在window函数中进行随机播放。真的吗?
  4. 如果它不知道,我该如何告诉 Spark 数据已经按正确的列进行了分区?
  5. 如何检查 DataFrame 的分区键?有这个命令吗?我知道如何检查分区数但如何查看分区键?
  6. 当我在每个步骤后打印文件中的分区数时,我在 read 之后有 42 个分区,在 withColumn 之后有 200 个分区,这表明 Spark 重新分区了我的 DataFrame
  7. 如果我有两个使用同一列重新分区的不同表,连接会使用该信息吗?

Does Spark know that the dataframe df2 is partitioned by column numerocarte?

没有。

If it does not know, how do I tell Spark the data is already partitioned by the right column?

你不知道。仅仅因为您保存了已打乱的数据,并不意味着它将加载相同的拆分。

How can I check a partitioning key of DataFrame?

加载数据后没有分区键,但您可以检查 queryExecution for Partitioner


实践中:

  • 如果你想支持高效的按键下推,使用DataFrameWriterpartitionBy方法。
  • 如果您希望对连接优化提供有限支持,请使用 bucketBy 元存储和持久表。

有关详细示例,请参阅

我正在回答我自己的问题以供将来参考。

根据@user8371915 的建议,bucketBy 成功了!

我正在保存我的 DataFrame df:

df.write
  .bucketBy(250, "userid")
  .saveAsTable("myNewTable")

然后当我需要加载这个时 table:

val df2 = spark.sql("SELECT * FROM myNewTable")

val w = Window.partitionBy("userid")

val df3 = df2.withColumn("newColumnName", sum(col("someColumn")).over(w)
df3.explain

我确认,当我在 userid 分区的 df2 上执行 window 函数时,没有随机播放!谢谢@user8371915!

我在调查中学到的一些东西

  • myNewTable 看起来像一个普通的 parquet 文件,但它不是。你可以用 spark.read.format("parquet").load("path/to/myNewTable") 正常读取它,但是这样创建的 DataFrame 将不会保留原来的分区!您必须使用 spark.sql select 才能正确分区 DataFrame.
  • 您可以使用 spark.sql("describe formatted myNewTable").collect.foreach(println) 查看 table 内部。这将告诉您哪些列用于分桶以及有多少个分桶。
  • Window 利用分区的函数和连接通常也需要排序。您可以在写入时使用 .sortBy() 对存储桶中的数据进行排序,排序也将保留在配置单元 table 中。 df.write.bucketBy(250, "userid").sortBy("somColumnName").saveAsTable("myNewTable")
  • 在本地模式下工作时,table myNewTable 会保存到我本地 Scala SBT 项目中的 spark-warehouse 文件夹中。通过spark-submit用mesos集群模式保存时,保存到hive仓库。对我来说它位于 /user/hive/warehouse.
  • 在执行 spark-submit 时,您需要在 SparkSession 中添加两个选项:.config("hive.metastore.uris", "thrift://addres-to-your-master:9083").enableHiveSupport()。否则您创建的配置单元 table 将不可见。
  • 如果您想将 table 保存到特定数据库,请在存储之前执行 spark.sql("USE your database")

更新 05-02-2018

我在使用 spark bucketing 和创建 Hive table 时遇到了一些问题。请参考为什么Spark saveAsTable with bucketBy创建数千个文件中的问题、回复和评论?